package cn.tnar.parkservice.service.impl;

import cn.tnar.parkservice.config.Constant;
import cn.tnar.parkservice.service.MqService;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.stream.Collectors;

/**
 * @author dzx
 * @ClassName:
 * @Description:
 * @date 2019年11月22日 16:33:47
 */
@Service
@Slf4j
public class MqServiceImpl implements MqService {


    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private AmqpAdmin amqpAdmin;
    @Autowired
    private AmqpManagementOperations amqpManagementOperations;

    @Value("${spring.rabbitmq.virtual-host}")
    private String vHost;

    /**
     * 将队列绑定到交换机上并下发数据
     *
     * @param queueName
     */
    public void convertAndSend(String exchangeName, String queueName, Object object) {
        try {
            //获取交换机
            Exchange exchange = amqpManagementOperations.getExchange(vHost, exchangeName);
            //如果交换机为空，则创建交换机
            if (exchange == null) {
                amqpAdmin.declareExchange(new TopicExchange(exchangeName, true, false));
                exchange = amqpManagementOperations.getExchange(vHost, exchangeName);
            }
            //如果交换机是topic交换机类型，则强转
            TopicExchange topicExchange = null;
            if (exchange instanceof TopicExchange) {
                topicExchange = (TopicExchange) exchange;
            }
            //获取队列
            Queue queue = amqpManagementOperations.getQueue(vHost, queueName);
            //队列为空则创建队列
            if (queue == null) {
                queue = new Queue(queueName, true, false, false);
                amqpAdmin.declareQueue(queue);
            }

            List<String> bindingsForExchange = amqpManagementOperations.getBindingsForExchange(vHost, exchangeName).parallelStream()
                    .filter(x -> x != null).map(x -> {
                        return x.getDestination();
                    }).collect(Collectors.toList());
            Binding bind = BindingBuilder.bind(queue).to(topicExchange).with(queueName);
            //如果当前交换机没有任何绑定或者没有绑定当前队列，则创建绑定
            if (CollectionUtils.isEmpty(bindingsForExchange) || !bindingsForExchange.contains(bind.getDestination())) {
                amqpAdmin.declareBinding(bind);
            }
            //发送消息
            log.info("下发mq消息到{}交换机,{}队列,消息体内容：{}", exchangeName, queueName, JSON.toJSONString(object));
            rabbitTemplate.convertAndSend(exchangeName, queueName, object);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("mq消息下发异常", e);
        }
    }

    public boolean mqSend(String jsonString, String topic) {
        try {
            Queue queue = amqpManagementOperations.getQueue(vHost, topic);
            if (queue == null) {
                Queue queue1 = new Queue(topic);
                amqpManagementOperations.addQueue(vHost, queue1);
                amqpAdmin.declareBinding(BindingBuilder.bind(queue1).to(new TopicExchange("amq.topic")).with(topic));
            }
            rabbitTemplate.convertAndSend("amq.topic", topic, jsonString);
            log.info(topic + "->", jsonString);
            return true;
        } catch (Exception e) {
            log.error(Constant.ERROR_MSG_MQ, jsonString);
            return false;
        }

    }
}
